In [1]:
#Importação das bibliotecas.
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import Row as Row
from pyspark.sql.functions import col, split
import random
import pandas

spark = SparkSession.builder\
.appName('pmd22')\
.config("spark.jars","neo4j-connector-apache-spark_2.12-4.1.4_for_spark_3.jar")\
.getOrCreate()


In [2]:
#Lendo arquivo de usuarios
dat_usuarios = spark.read.load("B:\Faculdade\PMD\people_music.csv", format="csv", sep=",", inferSchema="true", header="true")

#Lendo arquivo de musicas
dat_music = spark.read.load("B:\Faculdade\PMD\music.csv", format="csv", sep=",", inferSchema="true", header="true")

dat_usuarios.show();
dat_music.show();

+---------+----------+-----------+-------------------+
|id_person|      nome|  music_fav|             amigos|
+---------+----------+-----------+-------------------+
|        1|     Lorne|404,123,586|  1369,1744,406,223|
|        2| Gavrielle| 384,41,550|  845,1134,1933,823|
|        3|     Cordi|  52,391,39|    352,528,998,976|
|        4|    Florie| 368,155,76|  736,1172,1584,443|
|        5|    Odessa|536,590,507| 1783,1497,1915,425|
|        6|    Sandie| 67,159,221|   234,506,704,1583|
|        7|       Sam| 395,22,559| 1218,1870,1175,389|
|        8|    Leanna|245,159,491|   628,1192,430,198|
|        9|   Jasmina| 114,32,331| 1025,1227,691,1089|
|       10|    Lolita|  190,33,47|   63,1417,929,1706|
|       11|Anestassia|452,536,480|  768,628,1983,1258|
|       12|   Chandra|244,123,166|      640,729,9,421|
|       13|       Raf| 560,413,28|1727,1986,1741,1643|
|       14|    Teddie|315,192,468|    135,120,1755,62|
|       15|    Melina|  60,42,590| 1433,1997,863,1239|
|       16

In [3]:
#Renomeando colunas necessárias
dat_music = dat_music.withColumnRenamed("title","titulo") \
    .withColumnRenamed("artist","artista") \
.withColumnRenamed("the genre of the track","genero") \
.withColumnRenamed("year","ano") \
.withColumnRenamed("Beats.Per.Minute -The tempo of the song","BPM") \
.withColumnRenamed("Energy- The energy of a song - the higher the value the more energtic","energia") \
.withColumnRenamed("Danceability - The higher the value the easier it is to dance to this song","dancabilidade") \
.withColumnRenamed("Loudness/dB - The higher the value the louder the song","volume") \
.withColumnRenamed("Liveness - The higher the value the more likely the song is a live recording","liveness") \
.withColumnRenamed("Valence - The higher the value the more positive mood for the song","valencia") \
.withColumnRenamed("Length - The duration of the song","comprimento") \
.withColumnRenamed("Acousticness - The higher the value the more acoustic the song is","acustica") \
.withColumnRenamed("Speechiness - The higher the value the more spoken word the song contains","fala") \
.withColumnRenamed("Popularity- The higher the value the more popular the song is","popularidade") 

dat_music.printSchema()


root
 |-- titulo: string (nullable = true)
 |-- artista: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- BPM: string (nullable = true)
 |-- energia: integer (nullable = true)
 |-- dancabilidade: integer (nullable = true)
 |-- volume: integer (nullable = true)
 |-- liveness: integer (nullable = true)
 |-- valencia: integer (nullable = true)
 |-- comprimento: integer (nullable = true)
 |-- acustica: integer (nullable = true)
 |-- fala: integer (nullable = true)
 |-- popularidade: integer (nullable = true)



In [4]:
#Excluindo colunas nao utilizadas
dat_music = dat_music.drop("volume", "liveness","acustica","fala","comprimento")

#Adicionando coluna ‘Id_music’ no dataset de musicas
windowSpec = Window.orderBy("titulo")
dat_music = dat_music.withColumn("id_music", F.row_number().over(windowSpec))

dat_music.show()

+--------------------+---------------+----------------+----+---+-------+-------------+--------+------------+--------+
|              titulo|        artista|          genero| ano|BPM|energia|dancabilidade|valencia|popularidade|id_music|
+--------------------+---------------+----------------+----+---+-------+-------------+--------+------------+--------+
|...Ready For It? ...|   Taylor Swift|             pop|2018|160|     84|           58|      50|          52|       1|
|                 1+1|        Beyonce|       dance pop|2011| 63|     38|           30|      26|          60|       2|
|           24K Magic|     Bruno Mars|             pop|2017|107|     80|           82|      63|          69|       3|
|2U (feat. Justin ...|   David Guetta|       dance pop|2018|145|     61|           53|      53|          65|       4|
|                   3| Britney Spears|       dance pop|2010|135|     71|           70|      79|          62|       5|
|               43776|        Beyonce|       dance pop|2

In [5]:
#Gerar o csv antes de inserir no couch
# dat_music.coalesce(1).write.csv(
#         path='B:\Faculdade\PMD\musicWithIds',
#         mode="overwrite",
#         header=True,
#         sep=",")

datpandas = dat_music.toPandas().to_csv('B:\Faculdade\PMD\musicWithIds\music.csv', sep=',', encoding='utf-8', index = False)

In [6]:
#Conexão com Couchdb
import csv
import couchdb 

couch = couchdb.Server('http://admin:123456789@127.0.0.1:5984')

dbname = "musicas" #Nome do banco


In [7]:
#Criação do Banco de Dados e Inserção dos dados da fonte
try:
    db = couch[dbname]
except:
    db = couch.create(dbname)
    with open(r'B:\Faculdade\PMD\musicWithIds\music.csv', encoding="utf-8") as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        keys = next(csv_reader)
        line_count = 0
        for values in csv_reader:
            values[3] = int(values[3])if values[3].isdigit() else 0
            values[4] = int(values[4])if values[4].isdigit() else 0
            values[5] = int(values[5])if values[5].isdigit() else 0
            values[6] = int(values[6])if values[6].isdigit() else 0
            values[7] = int(values[7])if values[7].isdigit() else 0
            values[8] = int(values[8])if values[8].isdigit() else 0
            db.save(dict(zip(keys, values)))

In [8]:
#Preparando os Nodes para o Neo4j
dat_music4j = dat_music.select(dat_music["id_music"], dat_music["titulo"])

dat_usuarios = dat_usuarios.withColumn("most_list", split(col("music_fav"), ",").getItem(0))
dat_usuarios4j = dat_usuarios.select(dat_usuarios["id_person"], dat_usuarios["nome"],dat_usuarios["most_list"] )


In [9]:
#Preparando os Relationships para o Neo4j
columnsPerson = ["id_person", "id_amigo"]
columnsMusic = ["id_person", "id_music"]
arrPerson = []
arrMusic = []
dat_usu_amigo = spark.sparkContext.emptyRDD()
for row in dat_usuarios.collect():
    vetAmigos = row["amigos"].split(",")
    vetMusics = row["music_fav"].split(",")
    
    for amigo in vetAmigos:
        arrPerson.append((row["id_person"],int(amigo)))
        
    for music in vetMusics:
        arrMusic.append((row["id_person"],int(music)))  
    
        
usu_amigo = spark.createDataFrame(arrPerson, columnsPerson)
usu_music = spark.createDataFrame(arrMusic, columnsMusic)


usu_amigo.show();
usu_music.show();

+---------+--------+
|id_person|id_amigo|
+---------+--------+
|        1|    1369|
|        1|    1744|
|        1|     406|
|        1|     223|
|        2|     845|
|        2|    1134|
|        2|    1933|
|        2|     823|
|        3|     352|
|        3|     528|
|        3|     998|
|        3|     976|
|        4|     736|
|        4|    1172|
|        4|    1584|
|        4|     443|
|        5|    1783|
|        5|    1497|
|        5|    1915|
|        5|     425|
+---------+--------+
only showing top 20 rows

+---------+--------+
|id_person|id_music|
+---------+--------+
|        1|     404|
|        1|     123|
|        1|     586|
|        2|     384|
|        2|      41|
|        2|     550|
|        3|      52|
|        3|     391|
|        3|      39|
|        4|     368|
|        4|     155|
|        4|      76|
|        5|     536|
|        5|     590|
|        5|     507|
|        6|      67|
|        6|     159|
|        6|     221|
|        7|     395|
|       

In [10]:
#Escrevendo usuários no neo4j
dw_usuarios = (dat_usuarios4j.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("labels", ":Person")\
.option("node.keys", "id_person")\
.save())

In [11]:
#Escrevendo músicas no neo4j
dw_musics = (dat_music4j.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("labels", ":Music")\
.option("node.keys", "id_music")\
.save())

In [12]:
#Fazendo as relações de amigos
usu_amigo.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.type","basic")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("relationship", "amigo_de")\
.option("relationship.save.strategy", "keys")\
.option("relationship.source.labels", ":Person")\
.option("relationship.source.node.keys", "id_person:id_person")\
.option("relationship.target.labels", ":Person")\
.option("relationship.target.node.keys", "id_amigo:id_person")\
.save()

In [13]:
#Fazendo as relações de musicas favoritas
usu_music.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.type","basic")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("relationship", "music_fav")\
.option("relationship.save.strategy", "keys")\
.option("relationship.source.labels", ":Person")\
.option("relationship.source.node.keys", "id_person:id_person")\
.option("relationship.target.labels", ":Music")\
.option("relationship.target.node.keys", "id_music:id_music")\
.save()

In [32]:
#Consulta 1 - Retornar as músicas que tenham valores próximos de BPM, energia, dançabilidade e valência da música favorita do usuário.(Documentos)
idClientC1 = '1534'
allDocs =[]

#grafo
c1_mIds = spark.read\
.format("org.neo4j.spark.DataSource")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.type","basic")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("query","MATCH (p:Person) WHERE p.id_person = " + idClientC1 + " RETURN p.most_list")\
.load()

#Musica favorita do cliente
for doc in db.find({
   'selector': { 
        "id_music": {
            "$eq": c1_mIds.collect()[0][0]
        }
    },
    "fields": ["BPM", "energia", "dancabilidade", "valencia","titulo", "id_music"] 
    }):
    fav = doc

print(fav["BPM"], fav["energia"], fav["dancabilidade"], fav["valencia"], fav["titulo"], "\n\n\n")

#Consulta baseada no intervalo
for doc in db.find({
   "selector": {
      "$and": [
         {
            "BPM": {
               "$and": [
                  {
                     "$gte": int(fav["BPM"]) - 8
                  },
                  {
                     "$lte": int(fav["BPM"]) + 8
                  }
               ]
            }
         },
         {
            "energia": {
                "$and": [
                  {
                     "$gte": int(fav["energia"]) - 6
                  },
                  {
                     "$lte": int(fav["energia"]) + 6
                  }
               ]
            }
         },
         {
            "dancabilidade": {
                "$and": [
                  {
                     "$gte": int(fav["dancabilidade"]) - 6
                  },
                  {
                     "$lte": int(fav["dancabilidade"]) + 6
                  }
               ]
            }
         },
         {
            "valencia": {
                "$and": [
                  {
                     "$gte": int(fav["valencia"]) - 8
                  },
                  {
                     "$lte": int(fav["valencia"]) + 8
                  }
               ]
            }
         }
      ]
   },
    "limit": 200
    
}):
    print(doc["BPM"], doc["energia"], doc["dancabilidade"], doc["valencia"], doc["titulo"])

125 94 60 55 We Are One (Ole Ola) [The Official 2014 FIFA World Cup Song] 



118 88 65 49 Best Song Ever
131 93 66 53 Judas
128 93 57 58 Sweet Nothing (feat. Florence Welch)
124 93 63 47 This Is What You Came For
126 92 54 51 Under Control
125 94 60 55 We Are One (Ole Ola) [The Official 2014 FIFA World Cup Song]


In [76]:
#Consulta 2 - Retornar as músicas agrupadas pelos gêneros das músicas favoritas do usuário ordenadas por ano.(Documentos)
idClientC2 = '657'
musicsFav = []
musicsgens = []

#Busca as músicas favorita do cliente
c1_mIds = spark.read\
.format("org.neo4j.spark.DataSource")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.type","basic")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("query","MATCH (p:Person) - [re:music_fav] -> (m:Music) WHERE p.id_person =" + idClientC2 + " RETURN m.id_music")\
.load()

for item in c1_mIds.collect():
    musicsFav.append(str(item[0]))
    
print(musicsFav)
#Genero da musica favorita do cliente
for doc in db.find({
   "selector": {
      "id_music": {
         "$or": musicsFav
      }
   },
    "sort": [{"ano":"asc"}]
    }):
    musicsgens.append(doc["genero"])
print(musicsgens)
for doc in db.find({
   "selector": {
      "$or": [
         {
            "genero": {
               "$or": musicsgens
            }
         }
      ]
   },
   "sort": [
        {
            "genero": "asc"
        },
        {
             "ano": "asc"
        }
   ],
    "limit": 603
    }):
    print(doc["genero"], doc["ano"])

['577', '480', '8']
['art pop', 'permanent wave', 'dance pop']
art pop 2010
art pop 2013
art pop 2014
art pop 2015
art pop 2016
art pop 2017
art pop 2017
art pop 2017
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2010
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance pop 2011
dance po

In [54]:
#Consulta 3 - Retornar uma playlist com músicas de pessoas que compartilhem a mesma música favorita, percorrendo cinco níveis de profundidade. (Grafo + Documento)
idClientC3 = '45'
allIds =[]

c3_mIds = spark.read\
.format("org.neo4j.spark.DataSource")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.type","basic")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("query","MATCH (p:Person) - [fav:music_fav] -> (m:Music) <- [mu:music_fav*1..5] - (p2:Person) - [mu2:music_fav] -> (demais_music:Music) WHERE m.id_music = toInteger(p.most_list) AND p.id_person = "+ idClientC3+" AND demais_music.id_music = toInteger(p2.most_list) AND p.id_person <> p2.id_person RETURN demais_music.id_music")\
.load()


for idmusic in c3_mIds.collect():
    allIds.append(str(idmusic[0]))
    
    
for doc in db.find({
   "selector": {
      "id_music": {
         "$or": allIds
      }
   },
    "limit": 200
    }):
    print(doc["titulo"], doc["artista"], doc["genero"], doc["ano"]) 


Doesn't Mean Anything Alicia Keys hip pop 2010
Dog Days Are Over Florence + The Machine art pop 2010
End Game Taylor Swift pop 2018
I Wanna Go Britney Spears dance pop 2011
Impossible James Arthur pop 2013
Rise Katy Perry dance pop 2016
Sparks Hilary Duff dance pop 2015
Want To Dua Lipa dance pop 2018


In [35]:
#Consulta 4 - Retornar uma playlist levando em consideração as músicas favoritas de amigos em até três níveis de profundidade, ordenada pela popularidade. (Grafo + Documento)
idClientC4 = '865'
allIds =[]

c4_mIds = spark.read\
.format("org.neo4j.spark.DataSource")\
.option("url", "neo4j://localhost:7687")\
.option("authentication.type","basic")\
.option("authentication.basic.username", "neo")\
.option("authentication.basic.password", "1234")\
.option("query","MATCH (p:Person) - [am:amigo_de*1..3] -> (p2:Person) - [mu:music_fav] -> (p2mu:Music) \
        WHERE p.id_person = "+idClientC4+"\
        RETURN p2mu.id_music")\
.load()

for idmusic in c4_mIds.collect():
    allIds.append(str(idmusic[0]))
    
for doc in db.find({
   "selector": {
      "id_music": {
         "$or": allIds
      }
   },
    "limit": 200
    }):
    print(doc["titulo"], doc["artista"], doc["genero"], doc["ano"]) 


3 Britney Spears dance pop 2010
Alejandro Lady Gaga dance pop 2010
All I Ask Adele british soul 2016
All I Ask Adele british soul 2017
All The Right Moves OneRepublic dance pop 2010
All We Know The Chainsmokers electropop 2016
All of Me John Legend neo mellow 2014
Angel Fifth Harmony dance pop 2017
Anything Could Happen Ellie Goulding dance pop 2013
Atlas - From 揟he Hunger Games: Catching Fire?Soundtrack Coldplay permanent wave 2013
Attention Charlie Puth dance pop 2018
BURNITUP Janet Jackson dance pop 2016
Bad Liar Selena Gomez dance pop 2018
Bad Romance Lady Gaga dance pop 2010
Bang Bang Jessie J australian pop 2015
Beauty And A Beat Justin Bieber canadian pop 2012
Beneath Your Beautiful Labrinth pop 2013
Best Thing I Never Had Beyonce dance pop 2011
Blow Me (One Last Kiss) Pink dance pop 2012
Bodak Yellow Cardi B pop 2017
Body Moves DNCE dance pop 2017
Body Say Demi Lovato dance pop 2016
Bon app閠it Katy Perry dance pop 2017
Boom Boom RedOne moroccan pop 2018
Break Free Ariana Grande