In [30]:
import json
import pandas as pd
import glob
from pathlib import Path
import networkx as nx
import matplotlib.pyplot as plt
import sys
import re
import collections
import os
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit


In [2]:
total_playlists = 0
total_tracks = 0
tracks = set()
artists = set()
albums = set()
titles = set()
total_descriptions = 0
ntitles = set()
title_histogram = collections.Counter()
artist_histogram = collections.Counter()
track_histogram = collections.Counter()
last_modified_histogram = collections.Counter()
num_edits_histogram = collections.Counter()
playlist_length_histogram = collections.Counter()
num_followers_histogram = collections.Counter()

quick = False
max_files_for_quick_processing = 5

In [4]:
def normalize_name(name):
    name = name.lower()
    name = re.sub(r"[.,\/#!$%\^\*;:{}=\_`~()@]", " ", name)
    name = re.sub(r"\s+", " ", name).strip()
    return name


def to_date(epoch):
    return datetime.datetime.fromtimestamp(epoch).strftime("%Y-%m-%d")


def process_playlist(playlist):
    global total_playlists, total_tracks, total_descriptions

    total_playlists += 1
    # print playlist['playlist_id'], playlist['name']

    if "description" in playlist:
        total_descriptions += 1

    titles.add(playlist["name"])
    nname = normalize_name(playlist["name"])
    ntitles.add(nname)
    title_histogram[nname] += 1

    playlist_length_histogram[playlist["num_tracks"]] += 1
    last_modified_histogram[playlist["modified_at"]] += 1
    num_edits_histogram[playlist["num_edits"]] += 1
    num_followers_histogram[playlist["num_followers"]] += 1

    for track in playlist["tracks"]:
        total_tracks += 1
        albums.add(track["album_uri"])
        tracks.add(track["track_uri"])
        artists.add(track["artist_uri"])

        full_name = track["track_name"] + " by " + track["artist_name"]
        artist_histogram[track["artist_name"]] += 1
        track_histogram[full_name] += 1


def process_info(_):
    pass

def process_mpd(path):
    count = 0
    filenames = os.listdir(path)
    for filename in sorted(filenames):
        if filename.startswith("mpd.slice.") and filename.endswith(".json"):
            fullpath = os.sep.join((path, filename))
            f = open(fullpath)
            js = f.read()
            f.close()
            mpd_slice = json.loads(js)
            process_info(mpd_slice["info"])
            for playlist in mpd_slice["playlists"]:
                process_playlist(playlist)
            count += 1

            if quick and count > max_files_for_quick_processing:
                break

In [9]:
spark = SparkSession.builder.master("local") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

22/11/20 12:36:52 WARN Utils: Your hostname, phu-ThinkPad-T480 resolves to a loopback address: 127.0.1.1; using 192.168.0.22 instead (on interface wlp3s0)
22/11/20 12:36:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/20 12:36:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [42]:
df = spark.read.option("header",True).csv("data_smaller")

In [43]:
df.write.option("header",True) \
 .csv("data_for_neo4jadmin/combined_data")

[Stage 33:>                                                         (0 + 1) / 1]

In [38]:
df_track_node = df.select('track_uri','track_name','duration_ms').distinct()

In [41]:
df_track_node.printSchema()

root
 |-- track_uri: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- duration_ms: string (nullable = true)



In [31]:
df_track_node = df.select('track_uri','track_name','duration_ms')
df_track_node = df_track_node.withColumn("label", lit("Track"))
df_track_node.write.option("header",False) \
 .csv("data_for_neo4jadmin/Tracks")

In [35]:
df_playlist_node = df.select("playlist_pid","name","collaborative", \
"modified_at","num_tracks","num_albums","num_followers","num_edits", \
"duration_ms_playlist","num_artists","description")

In [37]:
df_playlist_node.distinct().show()

[Stage 26:>                                                         (0 + 1) / 1]

+--------------------+--------------+-------------+-----------+----------+----------+-------------+---------+--------------------+-----------+-----------+
|        playlist_pid|          name|collaborative|modified_at|num_tracks|num_albums|num_followers|num_edits|duration_ms_playlist|num_artists|description|
+--------------------+--------------+-------------+-----------+----------+----------+-------------+---------+--------------------+-----------+-----------+
|                 104|          JAMS|        false| 1458604800|        65|        30|            1|        7|            15475350|         26|       null|
|                 111|       country|        false| 1507420800|        24|        20|            1|       13|             5198964|         16|       null|
|                 441|     chill out|        false| 1462665600|       203|       176|            4|       46|            49593533|        141|       null|
|                 762|    Gospel Mix|        false| 1506816000|       

                                                                                