In [58]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

conf = SparkConf()
conf.set("spark.jars", "./neo4j-connector-apache-spark_2.12-5.0.1_for_spark_3.jar, ./mongo-spark-connector_2.11-2.0.0.jar")
conf.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.0")
conf.set("spark.mongodb.read.connection.uri", "mongodb://localhost:27017")

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('rede-devs') \
    .config(conf=conf) \
    .getOrCreate()

In [59]:
df = spark.read.csv("survey_results_public.csv", header=True)
df = df.select('Respondent', 'Age', 'LanguageWorkedWith', 'DevType', 'FrameWorkWorkedWith', 'DatabaseWorkedWith', 'PlatformWorkedWith', 'UndergradMajor', 'Country', 'FormalEducation', 'LanguageDesireNextYear', 'ConvertedSalary', 'Exercise', 'YearsCoding', 'YearsCodingProf', 'WakeTime', 'HoursComputer', 'HoursOutside')


In [180]:
from pyspark.sql.functions import col, split, array, when

result1 = df \
            .withColumn('LanguageWorkedWith', split(df['LanguageWorkedWith'], ';')) \
            .withColumn('LanguageDesireNextYear', split(df['LanguageDesireNextYear'], ';')) \
            .withColumn('DevType', split(df['DevType'], ';')) \
            .withColumn('FrameworkWorkedWith', split(df['FrameworkWorkedWith'], ';')) \
            .withColumn('DatabaseWorkedWith', split(df['DatabaseWorkedWith'], ';')) \
            .withColumn('PlatformWorkedWith', split(df['PlatformWorkedWith'], ';')) \
            .withColumn('ConvertedSalary', col('ConvertedSalary')/12 ) \
            .withColumn('Exercise', when(col('Exercise') == "I don't typically exercise", 0) \
                                   .when(col('Exercise') == '1 - 2 times per week', 1.5) \
                                   .when(col('Exercise') == '3 - 4 times per week', 3.5) \
                                   .when(col('Exercise') == 'Daily or almost every day', 7) \
                                   .otherwise(None)) \
            .withColumn('YearsCoding', when(col('YearsCoding') == "0-2 years", 1) \
                                      .when(col('YearsCoding') == "3-5 years", 4) \
                                      .when(col('YearsCoding') == "6-8 years", 7) \
                                      .when(col('YearsCoding') == "9-11 years", 10) \
                                      .when(col('YearsCoding') == "12-14 years", 13) \
                                      .when(col('YearsCoding') == "15-17 years", 16) \
                                      .when(col('YearsCoding') == "18-20 years", 19) \
                                      .when(col('YearsCoding') == "21-23 years", 22) \
                                      .when(col('YearsCoding') == "24-26 years", 25) \
                                      .when(col('YearsCoding') == "27-29 years", 28) \
                                      .when(col('YearsCoding') == "30 or more years", 30) \
                                      .otherwise(None)) \
            .withColumn('YearsCodingProf', when(col('YearsCodingProf') == "0-2 years", 1) \
                                          .when(col('YearsCodingProf') == "3-5 years", 4) \
                                          .when(col('YearsCodingProf') == "6-8 years", 7) \
                                          .when(col('YearsCodingProf') == "9-11 years", 10) \
                                          .when(col('YearsCodingProf') == "12-14 years", 13) \
                                          .when(col('YearsCodingProf') == "15-17 years", 16) \
                                          .when(col('YearsCodingProf') == "18-20 years", 19) \
                                          .when(col('YearsCodingProf') == "21-23 years", 22) \
                                          .when(col('YearsCodingProf') == "24-26 years", 25) \
                                          .when(col('YearsCodingProf') == "27-29 years", 28) \
                                          .when(col('YearsCodingProf') == "30 or more years", 30) \
                                          .otherwise(None)) \
            .withColumn('WakeTime', when(col('WakeTime') == "After 12:01 PM", "12:00") \
                                    .when(col('WakeTime') == 'Before 5:00 AM', "05:30") \
                                    .when(col('WakeTime') == 'Between 5:00 - 6:00 AM', "05:30") \
                                    .when(col('WakeTime') == 'Between 6:01 - 7:00 AM', "06:30") \
                                    .when(col('WakeTime') == 'Between 6:01 - 7:00 AM', "06:30") \
                                    .when(col('WakeTime') == 'Between 7:01 - 8:00 AM', "07:30") \
                                    .when(col('WakeTime') == 'Between 8:01 - 9:00 AM', "08:30") \
                                    .when(col('WakeTime') == 'Between 9:01 - 10:00 AM', "09:30") \
                                    .otherwise(None)) \
            .withColumn('HoursComputer', when(col('HoursComputer') == "Less than 1 hour", 1) \
                                        .when(col('HoursComputer') == '1 - 4 hours', 2.5) \
                                        .when(col('HoursComputer') == '5 - 8 hours', 6.5) \
                                        .when(col('HoursComputer') == '9 - 12 hours', 10.5) \
                                        .when(col('HoursComputer') == 'Over 12 hours', 12) \
                                        .otherwise(None)) \
            .withColumn('HoursOutside', when(col('HoursOutside') == "Less than 30 minutes", 0.35) \
                                        .when(col('HoursOutside') == '30 - 59 minutes', 0.5) \
                                        .when(col('HoursOutside') == '1 - 2 hours', 1.5) \
                                        .when(col('HoursOutside') == '3 - 4 hours', 3.5) \
                                        .when(col('HoursOutside') == 'Over 4 hours', 7) \
                                        .otherwise(None)) \
            .withColumn('Age', when(col('Age') == "Under 18 years old", 18) \
                                .when(col('Age') == "18 - 24 years old", 23) \
                                .when(col('Age') == "25 - 34 years old", 30) \
                                .when(col('Age') == "35 - 44 years old", 40) \
                                .when(col('Age') == "45 - 54 years old", 50) \
                                .when(col('Age') == "55 - 64 years old", 60) \
                                .when(col('Age') == "65 years or older", 65) \
                                .otherwise(None)) \


result1 = result1.limit(1000)

In [181]:
# Criação dos nós de DEV (pesado)

from faker import Faker
from pyspark.sql.functions import col, lit, expr, udf
from pyspark.sql.types import StringType

fake = Faker()

random_full_name = fake.name()

udf_func = udf(fake.name, StringType())


neo4jData = result1.select('Respondent') \
                   .withColumn("name", udf_func()) \
                   .withColumnRenamed('Respondent', 'code') \


neo4jData.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Developer") \
    .option("node.keys", "code, name") \
    .save()

neo4jData.show(4)

                                                                                

+----+--------------+
|code|          name|
+----+--------------+
|   1|    Rita Young|
|   3|Kenneth Steele|
|   4|   Scott Moore|
|   5|    Erin Mills|
+----+--------------+
only showing top 4 rows



In [182]:
# Criação dos atributos NEO4J
from pyspark.sql.functions import explode
import uuid


udf_id_func = udf(uuid.uuid4, StringType())

# Languages

languages = result1.select(explode('LanguageWorkedWith').alias("language")).distinct()
langs = languages \
            .withColumn("language", col("language").cast("string"))


langs.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Language") \
    .option("node.keys", "language") \
    .save()

In [183]:
#DevType

devtype = result1 \
                .select(explode('DevType').alias('devtype')).distinct() \
                .withColumn("devtype", col("devtype").cast("string")) 
                
devtype.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":DevType") \
    .option("node.keys", "devtype") \
    .save()

In [184]:
framework = result1 \
                .select(explode('FrameworkWorkedWith').alias('framework')).distinct() \
                .withColumn("framework", col("framework").cast("string"))
                
framework.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Framework") \
    .option("node.keys", "framework") \
    .save()

In [185]:
database = result1 \
                .select(explode('DatabaseWorkedWith').alias('database')).distinct() \
                .withColumn("database", col("database").cast("string"))
                
database.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Database") \
    .option("node.keys", "database") \
    .save()

In [186]:
platform = result1 \
                .select(explode('PlatformWorkedWith').alias('platform')).distinct() \
                .withColumn("platform", col("platform").cast("string"))
                
platform.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Platform") \
    .option("node.keys", "platform") \
    .save()

In [187]:

major = result1 \
                .select('UndergradMajor').distinct() \
                .withColumnRenamed('UndergradMajor', 'major') \
                .withColumn("major", col("major").cast("string"))
                
major.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Major") \
    .option("node.keys", "major") \
    .save()

In [188]:
country = result1 \
                .select('Country').distinct() \
                .withColumnRenamed('Country', 'country') \
                .withColumn("country", col("country").cast("string"))
                
country.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":Country") \
    .option("node.keys", "country") \
    .save()

In [189]:
formalEducation = result1 \
                .select('FormalEducation').distinct() \
                .withColumnRenamed('FormalEducation', 'formalEducation') \
                .withColumn("formalEducation", col("formalEducation").cast("string"))
                
formalEducation.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":FormalEducation") \
    .option("node.keys", "formalEducation") \
    .save()

In [201]:
languageDesire = result1 \
                .select(explode('LanguageDesireNextYear').alias('languageDesire')).distinct() \
                .withColumn("languageDesire", col("languageDesire").cast("string"))
                
languageDesire.write.format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("labels", ":LanguageDesire") \
    .option("node.keys", "languageDesire") \
    .save()

In [191]:
formalRelations = result1.select('Respondent', 'FormalEducation') \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumnRenamed('FormalEducation', 'dst') \
                            .withColumn('relationship_type', lit('IS_IN')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

formalRelations.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "FormalEducation") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:formalEducation") \
    .option("relationship", "IS_IN") \
    .mode("overwrite") \
    .save()


In [192]:
langRel = result1.select('Respondent', explode('LanguageWorkedWith').alias('dst')) \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumn('relationship_type', lit('WORKED_WITH')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

langRel.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Language") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:language") \
    .option("relationship", "WORKED_WITH") \
    .mode("overwrite") \
    .save()

                                                                                

In [193]:
devtypeRel = result1.select('Respondent', explode('DevType').alias('dst')) \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumn('relationship_type', lit('IS_A')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

devtypeRel.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "DevType") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:devtype") \
    .option("relationship", "IS_A") \
    .mode("overwrite") \
    .save()

                                                                                

In [194]:
frameworkRel = result1.select('Respondent', explode('FrameworkWorkedWith').alias('dst')) \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumn('relationship_type', lit('WORKED_WITH')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

frameworkRel.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Framework") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:framework") \
    .option("relationship", "FRAMEWORK_WORKED_WITH") \
    .mode("overwrite") \
    .save()

In [195]:
databaseRel = result1.select('Respondent', explode('DatabaseWorkedWith').alias('dst')) \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumn('relationship_type', lit('DATABASE_WORKED_WITH')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

databaseRel.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Database") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:database") \
    .option("relationship", "DATABASE_WORKED_WITH") \
    .mode("overwrite") \
    .save()

                                                                                

In [196]:
platformRel = result1.select('Respondent', explode('PlatformWorkedWith').alias('dst')) \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumn('relationship_type', lit('PLATFORM_WORKED_WITH')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

platformRel.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Platform") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:platform") \
    .option("relationship", "PLATFORM_WORKED_WITH") \
    .mode("overwrite") \
    .save()

                                                                                

In [197]:
majorRelations = result1.select('Respondent', 'UndergradMajor') \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumnRenamed('UndergradMajor', 'dst') \
                            .withColumn('relationship_type', lit('IS_IN')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

majorRelations.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Major") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:major") \
    .option("relationship", "MAJORED_IN") \
    .mode("overwrite") \
    .save()

In [198]:
countryRelations = result1.select('Respondent', 'Country') \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumnRenamed('Country', 'dst') \
                            .withColumn('relationship_type', lit('IS_FROM')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

countryRelations.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Country") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:country") \
    .option("relationship", "IS_FROM") \
    .mode("overwrite") \
    .save()

In [202]:
languageDesireRel = result1.select('Respondent', explode('LanguageDesireNextYear').alias('dst')) \
                            .withColumnRenamed('Respondent', 'src') \
                            .withColumn('relationship_type', lit('WANTS_TO_LEARN')) \
                            .withColumn("src", col("src").cast("string")) \
                            .withColumn("dst", col("dst").cast("string")) \
                            .withColumn("relationship_type", col("relationship_type").cast("string")) \

languageDesireRel.write \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", "neo4j") \
    .option("authentication.basic.password", "password") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", "Developer") \
    .option("relationship.target.labels", "Language") \
    .option("relationship.source.node.keys", "src:code") \
    .option("relationship.target.node.keys", "dst:language") \
    .option("relationship", "WANTS_TO_LEARN") \
    .mode("overwrite") \
    .save()

                                                                                

In [169]:
from faker import Faker
from pyspark.sql.functions import col, lit, expr, udf
from pyspark.sql.types import StringType

fake = Faker()

random_full_name = fake.name()

udf_func = udf(fake.name, StringType())


perfis = result1.select('Respondent', 
               'Age', 
               'ConvertedSalary', 
               'Exercise', 
               'YearsCoding', 
               'YearsCodingProf', 
               'WakeTime', 
               'HoursComputer', 
               'HoursOutside') \
            .withColumn("Name", udf_func()) \
            .withColumnRenamed('Respondent', 'Code')

perfis.write.format("mongodb") \
  .option("uri", "mongodb://localhost:27017") \
  .option("spark.mongodb.write.database", "rede-devs") \
  .option("spark.mongodb.write.collection", "developers") \
  .mode("append") \
  .save()

                                                                                

In [57]:
result1.count()

98855