In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyravendb.store import document_store

In [None]:
from random import randrange, uniform, randint
import random

### **Inicializa o Pyspark**

In [None]:
spark = SparkSession\
 .builder\
 .appName("projeto-pmd")\
 .master("local[*]")\
 .config("spark.jars", "neo4j-connector-apache-spark_2.12-4.1.4_for_spark_3.jar")\
 .getOrCreate()

### **Manipulação dos dados do dataset de pessoas para criar os usuários**

In [None]:
usersDF = spark.read.csv("./datasets/users.csv", header=True, sep=',')

In [None]:
usersDF.show()

In [None]:
usersDF = usersDF.select("Id", "Name").withColumnRenamed('Id', 'id').withColumnRenamed('Name', 'nome')

**remove duplicatas**

In [None]:
usersDF = usersDF.dropDuplicates(subset=['id', 'nome'])

In [None]:
usersDF.show()
usersDF.count()

**remove dados que possuem o nomes escritos em chinês, arabe, etc**

In [None]:
usersDF = usersDF.select("id", "nome").where(col("nome").rlike("^[a-zA-Z]+")).orderBy(col('id'))

In [None]:
usersDF.show()
usersDF.count()

**seleciona uma parte dos dados para armazenar na base de dados Neo4J**

In [None]:
usersDF_sample = usersDF.limit(2000)

In [None]:
usersDF_sample.show()

 **Aqui é so para armazenar uma amostra especifica dos dados**

In [None]:
#usersDF_sample.write.csv("./datasets/users_sample", header=True, sep=',')
#usersDF_sample = spark.read.csv("datasets/user_sample.csv", sep=",")

**armazena os nos que representam os usuários no Neo4J**

In [None]:
(usersDF_sample.write
 .format("org.neo4j.spark.DataSource")
 .mode("Overwrite")
 .option("url", "bolt://localhost:11003")
 .option("authentication.basic.username", "neo4j")
 .option("authentication.basic.password", "1234")
 .option("labels", ":User")
 .option("node.keys", "id")
 .option("schema.optimization.type", "INDEX")
 .save())

### **criação de forma randomica da lista de amizades**

**cria uma lista com os ids dos usuários**

In [None]:
list_ids = []
for i in usersDF_sample.collect():
    list_ids.append(i.id)

**controi uma lista com ids randomicos para cada usuário**

In [None]:
usersRDD = usersDF_sample.rdd.map(lambda x: (x.id, random.sample(list_ids, randint(3, 50))))

In [None]:
friendsDF = usersRDD.toDF(["id", "amigos"])

In [None]:
friendsDF.show()

**constroi um novo dataframe transformando os elemntos da lista de amigos de cada usuário em uma linha** 

In [None]:
friendsDF = friendsDF.select("id", explode("amigos").alias("amigo"))

In [None]:
friendsDF.show()

In [None]:
friendsDF = friendsDF.where(friendsDF.id != friendsDF.amigo)

In [None]:
friendsDF = friendsDF.withColumnRenamed("id", "source.id").withColumnRenamed("amigo", "target.id")

In [None]:
friendsDF.show()
friendsDF.count()

**Construção da rede de amizades no Neo4J**

In [None]:
(friendsDF.write
    .format("org.neo4j.spark.DataSource")
    .mode("overwrite")
    .option("url", "bolt://localhost:11003")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "1234")
    .option("relationship", "FOLLOW")
    .option("relationship.source.labels", ":User")
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.node.keys", "source.id:id")
    .option("relationship.target.labels", ":User")
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.node.keys", "target.id:id")
    .save()
 )

### **Adaptação dos dados para o RAVEN**

In [None]:
friendsDF = friendsDF.withColumnRenamed("source.id", "id").withColumnRenamed("target.id", "amigo")

In [None]:
friendsDF.show()

In [None]:
friendsDF = friendsDF.groupBy("id").agg(collect_list(friendsDF.amigo))

In [None]:
friendsDF = friendsDF.withColumnRenamed("collect_list(amigo)", "amigos")

In [None]:
friendsDF.show()

### **Leitura do dataset de livros**

In [None]:
booksDF = spark.read.csv("./datasets/books.csv", sep=";", header=True)

In [None]:
booksDF.count()

In [None]:
booksDF.show()

In [None]:
booksDF = booksDF.na.drop(subset=["Book-Title", "Book-Author", "Year-Of-Publication", "Publisher"])

In [None]:
booksDF.count()

In [None]:
booksDF = booksDF.select(["Book-Title", "Book-Author", "Year-Of-Publication", "Publisher"])

In [None]:
booksDF.show()

In [None]:
booksDF_sample = booksDF.limit(10000)

In [None]:
booksDF_sample.count()

### **Criação das estruturas dos anuncios**

In [None]:
anuncios = booksDF_sample.withColumn("anuncios", 
                    create_map(lit("Titulo"), col('Book-Title'),
                           lit("Autor"), col("Book-Author"), 
                           lit("Ano-Publicacao"), col("Year-Of-Publication"),
                           lit("Editora"), col("Publisher"))).drop('Book-Title', 'Book-Author', 'Year-Of-Publication', 'Publisher').withColumns({"valor": expr("round(rand() * 1000, 2)")})

In [None]:
anuncios.show()

In [None]:
list_ad = []
for an in anuncios.collect():
    list_ad.append({"Data": str(randint(2020, 2022)) + "-" + str(randint(1, 12)).zfill(2) + "-" + str(randint(1, 28)).zfill(2), "livro_descricao": an.anuncios, "preco": an.valor})

**Adiciona dados (telefone, email, lista de anuncios) aos usuários**

In [None]:
usersRDD = usersDF_sample.rdd.map(lambda x: (x.id, x.nome, "(" + str(randint(10, 99)) + ")" + str(randint(1000, 9999)) + "-" + str(randint(1000, 9999)), x.id + "@gmail.com", random.sample(list_ad, randint(3, 20))))

In [None]:
usersRDD.top(5)

In [None]:
usersRaven = usersRDD.toDF(["id", "nome", "telefone", "email", "anuncios_feitos"])

In [None]:
usersRaven.show()

In [None]:
users_friends = usersRaven.join(friendsDF, usersRaven.id == friendsDF.id).select(usersRaven.id, "nome", "amigos", "anuncios_feitos")

In [None]:
users_friends.show()

In [None]:
df2 = users_friends.withColumn("anuncios_recebidos", struct(col("id").alias("vendedor_id"), col("nome").alias("vendedor_nome"), col("anuncios_feitos").alias("anuncios"))).drop("nome", "anuncios_feitos")

In [None]:
df2.show()

In [None]:
adRecebidos = df2.select(explode("amigos").alias("amigo"), "anuncios_recebidos")

In [None]:
adRecebidos.show()

In [None]:
adRecebidos = adRecebidos.groupBy("amigo").agg(collect_list("anuncios_recebidos")).withColumnRenamed("amigo", "id")

In [None]:
adRecebidos = adRecebidos.withColumnRenamed("collect_list(anuncios_recebidos)", "anuncios_recebidos")

In [None]:
adRecebidos.show()

In [None]:
users = usersRaven.join(adRecebidos, usersRaven.id == adRecebidos.id).drop(adRecebidos.id)

In [None]:
users.show()

In [None]:
users.show()

### **Classes Usuário para armazenar os usuários no Raven**

In [None]:
class User: 
    
    def __init__(self, id, nome, telefone, email, anuncios_feitos=None, anuncios_recebidos=None):
        self.Id = id
        self.nome = nome
        self.telefone = telefone
        self.email = email
        self.anuncios_feitos = anuncios_feitos
        self.anuncios_recebidos = anuncios_recebidos
    
    def anunciar(self, book_ad):
        if book_ad is not None:
            self.meus_anuncios.append(book_ad);
    
    def receber_anuncio(self, book_ad):
        if book_ad is not None:
            self.amigos_anuncios.append(book_ad)

In [None]:
users.rdd.top(5)

In [None]:
users.show()

In [None]:
user_list = []
for i in users.rdd.collect():
    u = User(i.id, i.nome, i.telefone, i.email, i.anuncios_feitos, i.anuncios_recebidos)
    user_list.append(u)

**Conexão com o Raven**

In [None]:
with document_store.DocumentStore(
    urls=['http://localhost:8080'],
    database="Projeto-PMD"
) as store:
    store.initialize()

**Armazena usuários no RavenDb**

In [None]:
def adicona_usuarios(list_user):
    with store.open_session() as session1:
        for u in list_user:
            session1.store(u)
        session1.save_changes()

In [None]:
adicona_usuarios(user_list)